In [1]:
:extension OverloadedStrings

In [4]:
:load krapsh


krapsh.hs: openFile: does not exist (No such file or directory)

In [2]:
:load KrapshDisplays KrapshDagDisplay


Failed to load module KrapshDisplays: Could not find module `Formatting'
Could not find module `Spark.Core.Internal.Utilities'
Could not find module `Spark.Core.Internal.PathsUntyped'
Could not find module `Spark.Core.Internal.ComputeDag'
Could not find module `Spark.Core.Internal.DAGStructures'
Could not find module `Spark.Core.Internal.DatasetFunctions'
Could not find module `Spark.Core.Internal.DatasetStructures'
Could not find module `Spark.Core.Try'
Could not find module `Spark.Core.Internal.DAGFunctions'
Could not find module `Spark.Core.Internal.OpFunctions'
Could not find module `Spark.Core.Internal.OpStructures'
Could not find module `Spark.Core.Internal.ContextStructures'
Could not find module `Spark.Core.Internal.ContextInternal'
Could not find module `Spark.Core.StructuresInternal'
Failed to load module KrapshDagDisplay: Could not find module `Formatting'
Could not find module `Spark.Core.Internal.Utilities'
Could not find module `Spark.Core.Internal.ComputeDag'
Could not find module `Spark.Core.Internal.DAGStructures'
Could not find module `Spark.Core.Internal.DatasetFunctions'
Could not find module `Spark.Core.Internal.DatasetStructures'
Could not find module `Spark.Core.Try'
Could not find module `Spark.Core.Internal.DAGFunctions'
Could not find module `Spark.Core.Internal.OpFunctions'
Could not find module `Spark.Core.Internal.OpStructures'
Could not find module `Spark.Core.Internal.ContextStructures'
Could not find module `Spark.Core.Internal.ContextInternal'
Could not find module `Spark.Core.StructuresInternal'

Introduction

This notebook guides you through the first steps of using Krapsh.

In order to run it, you must start a spark server with the krapsh scala server loaded in it.

If you are familiar with Spark already, the Krapsh code is roughly organized the same way as Spark.


In [3]:
import Spark.Core.Dataset
import Spark.Core.Context
import Spark.Core.Functions
import Spark.Core.Column
import Spark.Core.ColumnFunctions

Krapsh communicates with Spark through a session object, called a SparkSession. For the purpose of interactive exploration, an implicit session can be created for a notebook. This pattern is not recommended for production cases, but it lets you try things quickly in a notebook.

Create a configuration objet. You can specify the location of the Spark endpoint. Calling createSparkSessionDef will allocate a default session. Note that all the xxxDef functions also have a xxx equivalent that takes or returns a session.


In [4]:
let conf = defaultConf {
        confEndPoint = "http://localhost",
        confRequestedSessionName = "session00_introduction" }
print conf
createSparkSessionDef conf


SparkSessionConf {confEndPoint = "http://localhost", confPort = 8081, confPollingIntervalMillis = 500, confRequestedSessionName = "session00_introduction"}
[Debug] Creating spark session at url: http://localhost:8081/session/session00_introduction @(<unknown>:<unknown> <unknown>:0:0)

Let us run our first program on Spark. We are going to create a tiny dataset and compute the number of elements in this dataset.

Creating a sequence of Spark operations does not require a session: at this point, you declare the operations that you want to do.

The command to create a dataset from existing elements is (surprise) dataset:


In [5]:
let ds = dataset ([1 ,2, 3, 4]::[Int])
ds


constant_e4756b@org.spark.Constant{int}

In order to count the number of elements, we are just going to use the built-in count command.

Unlike Spark, this command is also declarative and lazy: no computation will happen when it is called. It will return an observable that we can combine with other nodes or evaluate.


In [6]:
let c = count ds
c


count_a269fb@org.spark.Count!{int}

In [7]:
:type c


c :: LocalData Int

In order to query the value and execute the computation graph, you need to call one of the exec commands. This analyzes the computation graph for possible errors, sends it to Spark for execution, and returns the result.

In this notebook, we will use the default execution context, which is implicitly used when calling exec1Def. For production cases, you should pass your own context and use exec1.

You can only send observables. Dataframes cannot be evaluated directly.


In [8]:
mycount <- exec1Def c


[Info] Sending computations at url: http://localhost:8081/computation/session00_introduction/0/create @(<unknown>:<unknown> <unknown>:0:0)
[Debug] Sending computations status request at url: http://localhost:8081/status/session00_introduction/0/count_a269fb @(<unknown>:<unknown> <unknown>:0:0)
[Debug] Sending computations status request at url: http://localhost:8081/status/session00_introduction/0/count_a269fb @(<unknown>:<unknown> <unknown>:0:0)
[Info] count_a269fb finished: success @(<unknown>:<unknown> <unknown>:0:0)

As expected, mycount is an integer with the value 4:


In [9]:
:t mycount
mycount


mycount :: Int
4

In [10]:
:t mycount


mycount :: Int

Caching

If you execute again the same code, you will find that Krapsh runs it much faster:


In [11]:
_ <- exec1Def c


[Info] Sending computations at url: http://localhost:8081/computation/session00_introduction/1/create @(<unknown>:<unknown> <unknown>:0:0)
[Debug] Sending computations status request at url: http://localhost:8081/status/session00_introduction/1/count_a269fb @(<unknown>:<unknown> <unknown>:0:0)
[Info] count_a269fb finished: success @(<unknown>:<unknown> <unknown>:0:0)

Computations in Krapsh are completely deterministic: the same computation graph will always return the same exact result. Thanks to this property, Krapsh can aggressively cache final and intermediate results, and reuse them when they can remove some chunks of computations. Furthermore, since the graph of computations fully describes the computation, it can be saved along the data as a proof of how the result got generated, guaranteeing reproducible results.

Because some operations in Spark are intrisincally non-deterministic, this may require some changes from existing code. For example:

  • some operations such as collect always sort their results to maintain a result that is independent from the data layout
  • random is not available yet. Some strategies based on hashing are being considered.
  • current_time will most probably never be available within Krapsh. However, the current time can be retrieved from the environment and passed as a constant.

Note this is a preview, so the caching is not complete yet.

For example, when distributing and collecting a dataset, the order of the initial data does not matter:


In [12]:
set = dataset ([1,2,3] :: [Int])
x = collect (asCol set)
exec1Def x


[Info] Sending computations at url: http://localhost:8081/computation/session00_introduction/2/create @(<unknown>:<unknown> <unknown>:0:0)
[Debug] Sending computations status request at url: http://localhost:8081/status/session00_introduction/2/collect_b862c2 @(<unknown>:<unknown> <unknown>:0:0)
[Debug] Sending computations status request at url: http://localhost:8081/status/session00_introduction/2/collect_b862c2 @(<unknown>:<unknown> <unknown>:0:0)
[Info] collect_b862c2 finished: success @(<unknown>:<unknown> <unknown>:0:0)
[1,2,3]

In [13]:
set = dataset ([3,2,1] :: [Int]) -- Data is reversed, but the output is the same.
x = collect (asCol set)
exec1Def x


[Info] Sending computations at url: http://localhost:8081/computation/session00_introduction/3/create @(<unknown>:<unknown> <unknown>:0:0)
[Debug] Sending computations status request at url: http://localhost:8081/status/session00_introduction/3/collect_bb71c5 @(<unknown>:<unknown> <unknown>:0:0)
[Debug] Sending computations status request at url: http://localhost:8081/status/session00_introduction/3/collect_bb71c5 @(<unknown>:<unknown> <unknown>:0:0)
[Info] collect_bb71c5 finished: success @(<unknown>:<unknown> <unknown>:0:0)
[1,2,3]

Conclusion

This is the end of this first demonstration. From this point, you can explore different topics:

  • working with cached data
  • organizing computations with paths and scopes

In [ ]: